package org.databandtech.flink.source;

import java.util.Random;
import java.util.concurrent.TimeUnit;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.functions.source.SourceFunction;

public class TimerSource<T> implements SourceFunction<Tuple2<String, Integer>>{

	private static final long serialVersionUID = 4379804422031337157L;
	private volatile boolean isRunning = true;
	private final Random random = new Random();
	T[] array;
	int sleep;

	public TimerSource(T[] array, int sleep) {
		super();
		this.array = array;
		this.sleep = sleep;
	}

	@SuppressWarnings("unchecked")
	@Override
	public void run(SourceContext<Tuple2<String, Integer>> ctx) throws Exception {
		while (isRunning) {
			TimeUnit.SECONDS.sleep(sleep);
			ctx.collect((Tuple2<String, Integer>) Tuple2.of(array[random.nextInt(array.length)], 1));
		}		
	}

	@Override
	public void cancel() {
		isRunning = false;
	}

}
